/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.carbondata.index.examples;

import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.index.Segment;
import org.apache.carbondata.core.index.dev.IndexWriter;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.IndexSchema;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.core.util.CarbonUtil;

import com.google.gson.Gson;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class MinMaxDataWriter extends IndexWriter {

  private static final Logger LOGGER =
      LogServiceFactory.getLogService(TableInfo.class.getName());

  private Object[] pageLevelMin, pageLevelMax;

  private Map<Integer, BlockletMinMax> blockMinMaxMap;

  private int columnCnt;
  private DataType[] dataTypeArray;
  private String indexShardName;

  /**
   * Since the sequence of indexed columns is defined the same as order in user-created, so
   * map colIdx in user-created to colIdx in MinMaxIndex.
   * Please note that the sequence of min-max values for each column in blocklet-min-max is not
   * the same as indexed columns, so we need to reorder the origin while writing the min-max values
   */
  private Map<Integer, Integer> origin2MinMaxOrdinal = new HashMap<>();

  public MinMaxDataWriter(CarbonTable carbonTable, IndexSchema indexSchema, Segment segment,
      String shardName, List<CarbonColumn> indexColumns) {
    super(carbonTable.getTablePath(), indexSchema.getIndexName(), indexColumns, segment,
        shardName);
    this.columnCnt = indexColumns.size();
    for (CarbonColumn col : indexColumns) {
      this.origin2MinMaxOrdinal.put(col.getSchemaOrdinal(), col.getOrdinal());
    }
    if (this.dataTypeArray == null) {
      this.dataTypeArray = new DataType[this.columnCnt];
      for (int i = 0; i < this.columnCnt; i++) {
        this.dataTypeArray[i] = indexColumns.get(i).getDataType();
      }
    }
  }

  @Override
  public void onBlockStart(String blockId) {
    if (blockMinMaxMap == null) {
      blockMinMaxMap = new HashMap<>();
    }
  }

  @Override
  public void onBlockEnd(String blockId) {
  }

  @Override
  public void onBlockletStart(int blockletId) {
    pageLevelMin = new Object[columnCnt];
    pageLevelMax = new Object[columnCnt];
  }

  @Override
  public void onBlockletEnd(int blockletId) {
    updateCurrentBlockletMinMax(blockletId);
  }

  @Override
  public void onPageAdded(int blockletId, int pageId, int pageSize, ColumnPage[] pages) {
    // as an example, we don't use page-level min-max generated by native carbondata here, we get
    // the min-max by comparing each row
    for (int rowId = 0; rowId < pageSize; rowId++) {
      for (int colIdx = 0; colIdx < columnCnt; colIdx++) {
        Object originValue = pages[colIdx].getData(rowId);
        DataType dataType = dataTypeArray[colIdx];
        // for string & bytes_array, data is prefixed with length, need to remove it
        if (DataTypes.STRING == dataType || DataTypes.BYTE_ARRAY == dataType) {
          byte[] valueMin0 = (byte[]) pageLevelMin[colIdx];
          byte[] valueMax0 = (byte[]) pageLevelMax[colIdx];
          byte[] value1 = (byte[]) originValue;
          if (pageLevelMin[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE
              .compareTo(valueMin0, 0, valueMin0.length, value1, 2, value1.length - 2) > 0) {
            pageLevelMin[colIdx] = new byte[value1.length - 2];
            System.arraycopy(value1, 2, (byte[]) pageLevelMin[colIdx], 0, value1.length - 2);
          }
          if (pageLevelMax[colIdx] == null || ByteUtil.UnsafeComparer.INSTANCE
              .compareTo(valueMax0, 0, valueMax0.length, value1, 2, value1.length - 2) < 0) {
            pageLevelMax[colIdx] = new byte[value1.length - 2];
            System.arraycopy(value1, 2, (byte[]) pageLevelMax[colIdx], 0, value1.length - 2);
          }
        } else if (DataTypes.INT == dataType) {
          updateMinMax(colIdx, originValue, dataType);
        } else {
          throw new UnsupportedOperationException("Not implement yet");
        }
      }
    }
  }

  private void updateMinMax(int colIdx, Object originValue, DataType dataType) {
    if (pageLevelMin[colIdx] == null) {
      pageLevelMin[colIdx] = originValue;
    }
    if (pageLevelMax[colIdx] == null) {
      pageLevelMax[colIdx] = originValue;
    }

    if (DataTypes.SHORT == dataType) {
      if (pageLevelMin[colIdx] == null || (short) pageLevelMin[colIdx] - (short) originValue > 0) {
        pageLevelMin[colIdx] = originValue;
      }
      if (pageLevelMax[colIdx] == null || (short) pageLevelMax[colIdx] - (short) originValue < 0) {
        pageLevelMax[colIdx] = originValue;
      }
    } else if (DataTypes.INT == dataType) {
      if (pageLevelMin[colIdx] == null || (int) pageLevelMin[colIdx] - (int) originValue > 0) {
        pageLevelMin[colIdx] = originValue;
      }
      if (pageLevelMax[colIdx] == null || (int) pageLevelMax[colIdx] - (int) originValue < 0) {
        pageLevelMax[colIdx] = originValue;
      }
    } else if (DataTypes.LONG == dataType) {
      if (pageLevelMin[colIdx] == null || (long) pageLevelMin[colIdx] - (long) originValue > 0) {
        pageLevelMin[colIdx] = originValue;
      }
      if (pageLevelMax[colIdx] == null || (long) pageLevelMax[colIdx] - (long) originValue < 0) {
        pageLevelMax[colIdx] = originValue;
      }
    } else if (DataTypes.DOUBLE == dataType) {
      if (pageLevelMin[colIdx] == null
          || (double) pageLevelMin[colIdx] - (double) originValue > 0) {
        pageLevelMin[colIdx] = originValue;
      }
      if (pageLevelMax[colIdx] == null
          || (double) pageLevelMax[colIdx] - (double) originValue < 0) {
        pageLevelMax[colIdx] = originValue;
      }
    } else {
      // todo:
      throw new RuntimeException("Not implemented yet");
    }
  }

  private void updateCurrentBlockletMinMax(int blockletId) {
    byte[][] max = new byte[this.columnCnt][];
    byte[][] min = new byte[this.columnCnt][];
    for (int i = 0; i < this.columnCnt; i++) {
      int targetColIdx = origin2MinMaxOrdinal.get(i);
      max[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], pageLevelMax[i]);
      min[targetColIdx] = CarbonUtil.getValueAsBytes(this.dataTypeArray[i], pageLevelMin[i]);
    }

    BlockletMinMax blockletMinMax = new BlockletMinMax();
    blockletMinMax.setMax(max);
    blockletMinMax.setMin(min);
    blockMinMaxMap.put(blockletId, blockletMinMax);
  }


  public void updateMinMaxIndex(String blockId) {
    constructMinMaxIndex(blockId);
  }

  /**
   * Construct the Min Max Index.
   * @param blockId
   */
  public void constructMinMaxIndex(String blockId) {
    // construct Min and Max values of each Blocklets present inside a block.
    List<MinMaxIndexBlockDetails> tempMinMaxIndexBlockDetails = null;
    tempMinMaxIndexBlockDetails = loadBlockDetails();
    try {
      writeMinMaxIndexFile(tempMinMaxIndexBlockDetails, blockId);
    } catch (IOException ex) {
      LOGGER.info(" Unable to write the file");
    }
  }

  /**
   * loadBlockDetails into the MinMaxIndexBlockDetails class.
   */
  private List<MinMaxIndexBlockDetails> loadBlockDetails() {
    List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails = new ArrayList<MinMaxIndexBlockDetails>();

    for (int index = 0; index < blockMinMaxMap.size(); index++) {
      MinMaxIndexBlockDetails tmpminMaxIndexBlockDetails = new MinMaxIndexBlockDetails();
      tmpminMaxIndexBlockDetails.setMinValues(blockMinMaxMap.get(index).getMin());
      tmpminMaxIndexBlockDetails.setMaxValues(blockMinMaxMap.get(index).getMax());
      tmpminMaxIndexBlockDetails.setBlockletId(index);
      minMaxIndexBlockDetails.add(tmpminMaxIndexBlockDetails);
    }
    return minMaxIndexBlockDetails;
  }

  /**
   * Write the data to a file. This is JSON format file.
   * @param minMaxIndexBlockDetails
   * @param blockId
   * @throws IOException
   */
  public void writeMinMaxIndexFile(List<MinMaxIndexBlockDetails> minMaxIndexBlockDetails,
      String blockId) throws IOException {
    String filePath = indexPath + File.separator + blockId + ".minmaxindex";
    BufferedWriter brWriter = null;
    DataOutputStream dataOutStream = null;
    try {
      FileFactory.createNewFile(filePath);
      dataOutStream = FileFactory.getDataOutputStream(filePath);
      Gson gsonObjectToWrite = new Gson();
      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream, "UTF-8"));
      String minmaxIndexData = gsonObjectToWrite.toJson(minMaxIndexBlockDetails);
      brWriter.write(minmaxIndexData);
    } catch (IOException ioe) {
      LOGGER.info("Error in writing minMaxindex file");
      throw ioe;
    } finally {
      if (null != brWriter) {
        brWriter.flush();
      }
      if (null != dataOutStream) {
        dataOutStream.flush();
      }
      CarbonUtil.closeStreams(brWriter, dataOutStream);
      commitFile(filePath);
    }
  }

  @Override
  public void finish() throws IOException {
    updateMinMaxIndex(indexShardName);
  }

  /**
   * create and return path that will store the index
   *
   * @param dataPath patch to store the carbondata factdata
   * @param indexName index name
   * @return path to store the index
   * @throws IOException
   */
  public static String genIndexStorePath(String dataPath, String indexName)
      throws IOException {
    String dmDir = dataPath + File.separator + indexName;
    Path dmPath = FileFactory.getPath(dmDir);
    FileSystem fs = FileFactory.getFileSystem(dmPath);
    if (!fs.exists(dmPath)) {
      fs.mkdirs(dmPath);
    }
    return dmDir;
  }
}